#include "rocksdb_basic_operator.h"
#include "rocksdb/table.h"
#include "rocksdb/filter_policy.h"
#include "rocksdb/utilities/stackable_db.h"
#include "log.h"

#include <sys/stat.h>
#include <sys/types.h>

VET_INDEX_NAMESPACE_BEGIN

RocksDbOperator::RocksDbOperator()
    : p_rocksdb_db_(NULL)
    , p_rocksdb_env_(rocksdb::Env::Default())
    , col_fam_handle_vet_()
    , logger_()
    , s_current_dir_("")
{ }

RocksDbOperator::~RocksDbOperator()
{
    rocksdb::Status s = p_rocksdb_db_->SyncWAL();
    if ( !s.ok() )
    {
        // there has unreleased snapshot in the system, user should release the unreleased snapshots and try again
        log_error("close rocksdb failed! code:%d, subCode:%d, errMsg:%s", s.code(), s.subcode(), s.getState());
        return;
    }
  
  // drop column families
  // s = mRocksDb->DropColumnFamily(mColumnHandles[1]);
  // assert( s.ok() );
  for ( auto handler : col_fam_handle_vet_ )
  {
    // close the column families
    s = p_rocksdb_db_->DestroyColumnFamilyHandle(handler);
    assert( s.ok() );
  }
  col_fam_handle_vet_.clear();

  s = p_rocksdb_db_->Close();
  assert( s.ok() );
  
  p_rocksdb_db_ = NULL;
  
  // flush log
  if ( logger_ ) logger_->Flush();
}

bool RocksDbOperator::InitRocksDb(
    const std::string& s_db_dir)
{
    s_current_dir_ = std::move(s_db_dir);

    rocksdb::Options o_options;
    setup_db_options(o_options);
    setup_col_fam_options(o_options);
    setup_block_based_table_options(o_options);

    rocksdb::Status status = create_column_family(o_options);
    if (!status.ok()) {
        log_error("open rocksdb failed! path:%s,     \
        code:%d,                                     \
        subcode:%d,                                  \
        errmsg:%s",                                  \
        s_current_dir_.c_str(), status.code(), status.subcode(), status.getState());
        return false;
    }
    log_info("open rocksdb success! path:%s", s_current_dir_.c_str());
    return true;
}

int RocksDbOperator::InsertEntry(
        const std::string& key,
        const std::string& value,
        bool b_sync_mode, 
        int col_fam_type)
{
    int i_ret = key_exist_check(key , col_fam_type);
    if (1 == i_ret) {
        log_error("key:%s is alreadly in database:[%d]" , key.c_str(), col_fam_type);
        return -1; // key exist , -1: duplicate key
    } else if (i_ret != 0 ) {
        log_error("key:%s check in database:[%d] has other errorId:[%d]" , 
                        key.c_str(),
                        col_fam_type,
                        i_ret);
        return i_ret;
    }
    
    i_ret = ReplaceEntry(key , value , b_sync_mode , col_fam_type);
    if (i_ret != 0) {
        log_error("insert key:%s in database:[%d] error, errorId:[%d]",
                key.c_str(),
                col_fam_type,
                i_ret);
    }
    
    return i_ret;
}

int RocksDbOperator::DeleteEntry(
        const std::string& key,
        bool b_sync_mode,
        int col_fam_type)
{
    rocksdb::WriteOptions write_opt;
    write_opt.sync = b_sync_mode;

    rocksdb::Status status = p_rocksdb_db_->Delete(write_opt,
                                 col_fam_handle_vet_[col_fam_type],
                                 key);

    return -status.code();
}

int RocksDbOperator::UpdateEntry(
        const std::string& key,
        const std::string& value,
        bool b_sync_mode, 
        int col_fam_type)
{
    int i_ret = key_exist_check(key , col_fam_type);
    if (0 == i_ret) {
        log_error("key:%s is not exist in database:[%d]" , key.c_str(), col_fam_type);
        return -1; // key not exist , do nothing , -1 :key no found error
    } else if (i_ret != 1 ) {
        log_error("key:%s check in database:[%d] has other errorId:[%d]" , 
                        key.c_str(),
                        col_fam_type,
                        i_ret);
        return i_ret;
    }

    i_ret = ReplaceEntry(key , value , b_sync_mode , col_fam_type);
    if (i_ret != 0) {
        log_error("update key:%s in database:[%d] error, errorId:[%d]",
                key.c_str(),
                col_fam_type,
                i_ret);
    }
    
    return i_ret;
}

int RocksDbOperator::BatchUpdate(
                const std::set<std::string>& delete_set,
                const UpdatePair& update_vet,
                bool b_sync_mode,
                int col_fam_type)
{
    rocksdb::WriteOptions write_opt;
    write_opt.sync = b_sync_mode;

    rocksdb::WriteBatch update_batch;
    std::set<std::string>::iterator iter = delete_set.begin();
    for ( ; iter != delete_set.end(); ++ iter) {
        update_batch.Delete(col_fam_handle_vet_[col_fam_type],
            (*iter));
    }

    for (size_t i = 0; i < update_vet.size(); i++) {
        update_batch.Put(col_fam_handle_vet_[col_fam_type],
            update_vet[i].first,
            update_vet[i].second);
    }
    
    rocksdb::Status status = p_rocksdb_db_->Write(write_opt , &update_batch);
    return -status.code(); // 0: ok , -1: not found , other : see rocksdb defination
}

int RocksDbOperator::GetEntry(
        const std::string& key,
        std::string& value,
        int col_fam_type)
{
    rocksdb::ReadOptions read_opt;
    read_opt.total_order_seek = true;

    rocksdb::Status status = p_rocksdb_db_->Get(read_opt,
                                col_fam_handle_vet_[col_fam_type],
                                key,
                                &value);

    return -status.code(); // 0: ok , -1: not found , other : see rocksdb defination
}

bool RocksDbOperator::MultiGetEntry(
        const std::vector<int>& col_fam_type_vet,
        const std::vector<std::string>& keys,
        std::vector<std::string>& values,
        std::set<int>& error_seq_id,
        std::set<int>& no_found_seq_id)
{
    std::vector<rocksdb::ColumnFamilyHandle*> p_col_fam_vet;

    for (size_t i = 0; i < col_fam_type_vet.size(); i++) {
        p_col_fam_vet.emplace_back(col_fam_handle_vet_[col_fam_type_vet[i]]);
    }
    values.resize(keys.size());

    std::vector<rocksdb::Slice>  keys_slice(keys.begin(),keys.end());
    std::vector<rocksdb::Status> statues = p_rocksdb_db_->MultiGet(
                rocksdb::ReadOptions(),
                p_col_fam_vet , 
                keys_slice , 
                &values);

    bool b_ret = true;
    for (size_t i = 0; i < statues.size(); i++) {
        if (!statues[i].ok()) {
            if(!statues[i].IsNotFound()) {
                error_seq_id.insert(i);
                b_ret = false;
            } else {
                no_found_seq_id.insert(i);
            }
        }
    }
    return b_ret;
}

int RocksDbOperator::ReplaceEntry(
        const std::string& key,
        const std::string& value,
        bool b_sync_mode,
        int col_fam_type)
{
    rocksdb::WriteOptions write_opt;
    write_opt.sync = b_sync_mode;

    rocksdb::Status status = p_rocksdb_db_->Put(write_opt,
                             col_fam_handle_vet_[col_fam_type],
                             key,
                             value);
    return -status.code();
}

rocksdb::Iterator* RocksDbOperator::GetIterator(
    int col_fam_type)
{
    rocksdb::ReadOptions read_opt = rocksdb::ReadOptions();
    read_opt.fill_cache = false;
    read_opt.readahead_size = 1;
    read_opt.total_order_seek = true;

    return p_rocksdb_db_->NewIterator(read_opt , col_fam_handle_vet_[col_fam_type]);
}

int RocksDbOperator::key_exist_check(
        const std::string& key,
        int col_fam_type)
{
    static rocksdb::ReadOptions read_opt = rocksdb::ReadOptions();
    read_opt.fill_cache = false;

    static std::string s_value;
    bool b_ret = p_rocksdb_db_->KeyMayExist(read_opt,
                         col_fam_handle_vet_[col_fam_type],
                         key,
                         &s_value);
    if (!b_ret) {
        return 0; // key definitely does not exist in the database
    }
    
    // double check here
    int i_ret = GetEntry(key , s_value , col_fam_type);
    if (0 == i_ret) {
        return 1; // key is exist
    } else if (-1 == i_ret) {
        return 0; // key is no exist
    }
    log_error("find key failed, ret:%d, key:%s", i_ret, key.c_str());
    return i_ret;
}

int RocksDbOperator::setup_db_options(
    rocksdb::DBOptions& option)
{
    option.skip_stats_update_on_db_open = true;
    option.IncreaseParallelism(5);      // set concurrently operate
    option.env = p_rocksdb_env_;        // config rocksdb env pointer
    option.create_if_missing = true;    // create the db if it not exist
    int i_ret = create_dir(s_current_dir_);
    if ( i_ret != 0 ) return -1;
    option.wal_dir = std::move(s_current_dir_ + "/wallog");     // write ahead log persistent dir
    option.db_log_dir = std::move(s_current_dir_ + "/log");     // rocksdb running log persistent dir
    i_ret = create_dir(option.db_log_dir);
    if ( i_ret != 0 ) return -1;

    option.max_total_wal_size = 4 << 30;    // total wal log size : 4G 
    option.max_log_file_size = 50 << 20;    // per log file size : 50M
    option.keep_log_file_num = 20;         // keep 1G log in disk
    option.info_log_level = rocksdb::InfoLogLevel::ERROR_LEVEL; // default log level is error

    rocksdb::Status s = rocksdb::CreateLoggerFromOptions(s_current_dir_, option, &logger_);
             // the diffrent log level with rocksdb, set it manually
    if ( !s.ok() || !logger_ ) {
        log_error("create rocksdb logger failed, code:%d, subcode:%d, errmsg:%s", 
            s.code(), s.subcode(), s.getState());
        return -1;
    }
    logger_->SetInfoLogLevel(rocksdb::InfoLogLevel::DEBUG_LEVEL); // application may want 
    option.info_log = logger_;

    // option.target_file_size_base = 32 << 20; // 32M
    option.compaction_readahead_size = 2 << 20; // 2M
    option.max_background_jobs = 8;
    option.max_file_opening_threads = 3;
    option.max_manifest_file_size = 20 << 20; // 20M
    // option.max_successive_merges = 1000;
    option.new_table_reader_for_compaction_inputs = true;
    option.max_subcompactions = 3;
    return 0;
}

int RocksDbOperator::setup_col_fam_options(
    rocksdb::ColumnFamilyOptions& option)
{
    option.level_compaction_dynamic_level_bytes = true;
    option.write_buffer_size = 64 << 20; // 64M
    option.max_write_buffer_number = 5;
    option.min_write_buffer_number_to_merge = 2;

    option.num_levels = 7;
    std::vector<rocksdb::CompressionType> level_compression_styles;
    level_compression_styles.push_back(rocksdb::CompressionType::kNoCompression); // no compression in level 0
    level_compression_styles.push_back(rocksdb::CompressionType::kNoCompression); // level 1 also not compression
    level_compression_styles.push_back(rocksdb::CompressionType::kSnappyCompression); // level 2 use quick snappy compress
    level_compression_styles.push_back(rocksdb::CompressionType::kSnappyCompression);
    level_compression_styles.push_back(rocksdb::CompressionType::kSnappyCompression);
    level_compression_styles.push_back(rocksdb::CompressionType::kSnappyCompression);
    level_compression_styles.push_back(rocksdb::CompressionType::kZlibCompression);
    option.compression_per_level = level_compression_styles;
    option.compaction_style = rocksdb::CompactionStyle::kCompactionStyleLevel;
    option.level0_file_num_compaction_trigger = 2;
    option.level0_slowdown_writes_trigger = 20;
    option.level0_stop_writes_trigger = 36;

    option.target_file_size_base = 32 << 20;    // 32M
    option.max_bytes_for_level_base = 64 << 20; // 64M
    option.max_compaction_bytes = 100 << 20;    // 100M
    option.max_successive_merges = 1000;
    option.hard_pending_compaction_bytes_limit = 512ULL << 30;  // 512G
    return 0;
}

int RocksDbOperator::setup_block_based_table_options(
    rocksdb::ColumnFamilyOptions& option)
{
    rocksdb::BlockBasedTableOptions o_block_table_opt;

    o_block_table_opt.cache_index_and_filter_blocks = true;
    o_block_table_opt.pin_l0_filter_and_index_blocks_in_cache = true;

    o_block_table_opt.data_block_index_type = 
            rocksdb::BlockBasedTableOptions::kDataBlockBinaryAndHash;
    o_block_table_opt.data_block_hash_table_util_ratio = 0.75;
    o_block_table_opt.no_block_cache = false;
    o_block_table_opt.read_amp_bytes_per_bit = 0;
    o_block_table_opt.block_size = 64 << 10; // 64KB
    o_block_table_opt.whole_key_filtering = true;
    o_block_table_opt.filter_policy.reset(rocksdb::NewBloomFilterPolicy(10, false));

    option.table_factory.reset(rocksdb::NewBlockBasedTableFactory(o_block_table_opt));
    return 0;
}

rocksdb::Status RocksDbOperator::create_column_family(
    const rocksdb::Options& options)
{
    std::vector<std::string> s_col_fam_name;
    rocksdb::Status status = rocksdb::DB::ListColumnFamilies(
        options,
        s_current_dir_,
        &s_col_fam_name);

    rocksdb::ColumnFamilyOptions col_fam_opt;
    col_fam_opt.max_write_buffer_number = 5;
    col_fam_opt.min_write_buffer_number_to_merge = 1;
    col_fam_opt.memtable_prefix_bloom_size_ratio = 0.25;
    col_fam_opt.memtable_whole_key_filtering = true;
    col_fam_opt.optimize_filters_for_hits = true;

    if (!status.ok() && !status.IsPathNotFound()) {
        return status;
    } else if (status.IsPathNotFound() ||
               s_col_fam_name.empty()) {
        status = rocksdb::DB::Open(options , s_current_dir_ , &p_rocksdb_db_);
        if (!status.ok()) {
            return status;
        }
        
        s_col_fam_name.push_back("E_COL_FAM_VECTOR_DATA");
        s_col_fam_name.push_back("E_COL_FAM_META_DATA");
        status = p_rocksdb_db_->CreateColumnFamilies(col_fam_opt ,
                                                    s_col_fam_name ,
                                                    &col_fam_handle_vet_);
    } else {
        assert(s_col_fam_name.size() == E_COL_FAM_TOTAL_NUM);

        std::vector<rocksdb::ColumnFamilyDescriptor> col_fam_vet;
        col_fam_vet.push_back(rocksdb::ColumnFamilyDescriptor("E_COL_FAM_VECTOR_DATA",col_fam_opt));
        col_fam_vet.push_back(rocksdb::ColumnFamilyDescriptor("E_COL_FAM_META_DATA",col_fam_opt));
        col_fam_vet.push_back(rocksdb::ColumnFamilyDescriptor(rocksdb::kDefaultColumnFamilyName,col_fam_opt));

        status = rocksdb::DB::Open(options,
                                 s_current_dir_,
                                 col_fam_vet,
                                 &col_fam_handle_vet_,
                                 &p_rocksdb_db_);
        if (!status.ok()) {
            return status;
        }
    }
    
    return rocksdb::Status();
}

int RocksDbOperator::create_dir(
    const std::string& s_path)
{
    int ret = access(s_path.c_str(), F_OK);
  if ( ret != 0 )
  {
    if ( errno == ENOENT )
    {
      // create log dir
      if ( mkdir(s_path.c_str(), 0755) != 0 )
      {
        log_error("create rocksdb dir failed! path:%s, errno:%d", s_path.c_str(), errno);
        return -1;
      }
    }
    else
    {
      log_error("access rocksdb stuff dir failed!, path:%s, errno:%d", s_path.c_str(), errno);
      return -1;
    }
  }
  
  return 0;
}

VET_INDEX_NAMESPACE_END
